feat: broadcast shutdown notification to active threads (RFC #78 §1d Phase 1)#182
feat: broadcast shutdown notification to active threads (RFC #78 §1d Phase 1)#182ruan330 wants to merge 6 commits into
Conversation
Review — PR #182Good feature — users currently get no warning when the broker shuts down. This fixes that. ✅ What looks good
🔴 Must fix before merge1. Rebase onto latest
2. CI not triggered 🟡 Non-blocking3. k8s terminationGracePeriodSeconds 4. "Restarting" message on permanent shutdown SummaryValuable UX improvement. Rebase needed — pool and main.rs have both changed significantly. |
04d1e0e to
82c8a7e
Compare
masami-agent
left a comment
There was a problem hiding this comment.
PR Review: #182
Summary
- Problem: Broker restarts kill conversations silently — users get no notification that the broker is going down.
- Approach: Add
active_thread_ids()accessor toSessionPool, handle SIGTERM alongside SIGINT, and broadcast a neutral notification to each active Discord thread before shutting down shards. - Risk level: Low
Core Assessment
- Problem clearly stated: ✅ — well-motivated, linked to RFC #78 §1d
- Approach appropriate: ✅ — minimal, clean separation between pool (lifecycle) and main (Discord)
- Alternatives considered: ✅ — PR description covers three alternatives with clear reasoning
- Best approach for now: ✅ — Phase 1 scope is correct; no over-engineering
Findings
pool.rs — active_thread_ids()
Clean and minimal. Takes a read lock, returns an owned Vec<String>, releases immediately. No contention risk with the subsequent shutdown() write lock. ✅
main.rs — SIGTERM handling
The tokio::select! over ctrl_c() and sigterm.recv() is the standard pattern. The .expect("install SIGTERM handler") is acceptable here — if we cannot install a signal handler at startup, panicking is the right call since the broker would be unable to shut down gracefully anyway. ✅
main.rs — broadcast loop
The strip_prefix("discord:") + parse::<u64>() approach correctly handles the {platform}:{id} key format used by AdapterRouter. Non-Discord keys (e.g. slack:...) are silently skipped via the parse failure path, which is the right behavior for now. ✅
main.rs — error handling
channel.say() failures are logged as warnings and do not block shutdown. Correct — a dead Discord connection should never prevent pool cleanup. ✅
Review Summary
🔴 Blockers
Cargo.lockversion bump is out of scope. The diff changesCargo.lockfrom0.7.6→0.7.7. Onmain,Cargo.tomlis already0.7.7butCargo.lockis stale at0.7.6. While the sync is technically correct, it is unrelated to the shutdown broadcast feature and should not be bundled into this PR. Please revert theCargo.lockchange and let it be fixed in a separate housekeeping commit, or confirm this was an intentional rebase artifact that the maintainers are okay with.
💬 Questions
-
Non-Discord shutdown path also lacks SIGTERM. The
elsebranch (line ~194 on main:"running without discord, press ctrl+c to stop") still only awaitsctrl_c(). In a Slack-only or headless deployment,docker stop/killwould still bypass the graceful path. Was this intentionally left out of scope, or should it be addressed here for consistency? -
Slack adapter threads. When Slack support is active,
thread_keywill beslack:{id}. The broadcast loop silently skips non-Discord keys (theparse::<u64>()fails onslack:...afterstrip_prefix("discord:")returns the full key). This is fine for Phase 1, but worth noting — should there be atracing::debug!for skipped non-Discord keys so operators know the broadcast was Discord-only?
🔧 Suggested Changes
-
Consider a timeout on the broadcast loop. If there are many active sessions and Discord is slow or rate-limiting, the broadcast could delay shutdown significantly. A
tokio::time::timeoutwrapping the entire loop (e.g. 5–10 seconds) would provide a safety net. Not blocking, but worth considering for production robustness. -
The
pool.shutdown()is called twice on the Discord path. After the spawned task runsshard_manager.shutdown_all(), execution continues pastclient.start().await?and hits theshutdown_pool.shutdown().awaitat line ~208. The broadcast task does NOT callpool.shutdown()— it only broadcasts and shuts down shards. So the pool is shut down once (at the bottom), which is correct. However, the broadcast task clonespoolasbroadcast_poolbut never calls shutdown on it. This is fine, but the namingbroadcast_poolcould be clearer — maybenotification_poolor justpool_refto signal it is read-only usage.
ℹ️ Info
- The
Cargo.lockdiscrepancy onmain(Cargo.toml= 0.7.7,Cargo.lock= 0.7.6) is a pre-existing issue. It should be fixed, but in a separate commit. - RFC #78 §1d Phase 2 (session persistence) is correctly deferred. The neutral wording ("you can continue" vs "will resume") is the right call given no state is persisted yet.
⚪ Nits
- Line length on the
channel.say()string literal is ~90 chars — fine, but if you want to break it for readability, aconstat the top of the block would work nicely:const SHUTDOWN_MSG: &str = "🔄 Broker restarting. You can continue the conversation when the broker is back.";
Verdict
COMMENT — One blocker (Cargo.lock scope) and two questions to resolve. The core implementation is clean and well-designed. Once the Cargo.lock issue is addressed and the questions are answered, this should be ready for approval.
masami-agent
left a comment
There was a problem hiding this comment.
Hi @ruan330 — leaving some inline comments on specific areas. The overall review is in my earlier comment above. Thanks for the solid work on this!
| [[package]] | ||
| name = "openab" | ||
| version = "0.7.6" | ||
| version = "0.7.7" |
There was a problem hiding this comment.
🔴 BLOCKER — This version bump (0.7.6 → 0.7.7) is a pre-existing Cargo.lock / Cargo.toml sync issue on main, not related to the shutdown broadcast feature. Please revert this file from the PR to keep the scope clean. The lockfile sync can be fixed in a separate housekeeping PR or commit.
git checkout origin/main -- Cargo.lock| // systemctl stop / docker stop / kill deliver SIGTERM; without handling | ||
| // it, the broker would exit without running the broadcast below. | ||
| let shard_manager = client.shard_manager.clone(); | ||
| let broadcast_pool = pool.clone(); |
There was a problem hiding this comment.
💬 Question — The else branch below (non-Discord / Slack-only mode, around line 194 on main) still only awaits ctrl_c(). In a Slack-only or headless deployment, docker stop / kill would still bypass the graceful path. Was this intentionally left out of scope for this PR? If so, might be worth a // TODO: comment noting that SIGTERM handling should be added here too.
| info!(count = thread_ids.len(), "broadcasting shutdown notification"); | ||
| for thread_key in thread_ids { | ||
| // thread_key format: "platform:id" (e.g. "discord:1234567890"). | ||
| let raw_id = thread_key |
There was a problem hiding this comment.
🟡 Suggestion — Consider wrapping the broadcast loop in a timeout to guard against Discord rate limits or network issues delaying shutdown:
let _ = tokio::time::timeout(
std::time::Duration::from_secs(10),
async {
for thread_key in thread_ids {
// ... existing broadcast logic ...
}
},
).await;Not blocking — just a safety net for production. If Discord is slow, you probably still want the process to exit within a bounded time.
| .strip_prefix("discord:") | ||
| .unwrap_or(&thread_key); | ||
| if let Ok(id) = raw_id.parse::<u64>() { | ||
| let channel = serenity::model::id::ChannelId::new(id); |
There was a problem hiding this comment.
ℹ️ Info — When Slack adapter threads exist, their keys (slack:{id}) will pass through strip_prefix("discord:") unchanged (returns None → unwrap_or gives the full key), then parse::<u64>() fails on slack:12345 and they're silently skipped. This is correct behavior for Phase 1, but a tracing::debug! here for skipped non-Discord keys would help operators understand why some sessions didn't get notifications:
let raw_id = thread_key
.strip_prefix("discord:")
.unwrap_or(&thread_key);
if let Ok(id) = raw_id.parse::<u64>() {
// ... send notification ...
} else {
tracing::debug!(thread_key = %thread_key, "skipping non-Discord thread for shutdown broadcast");
}| ) | ||
| .await | ||
| { | ||
| tracing::warn!(thread_key = %thread_key, error = %e, "failed to post shutdown notification"); |
There was a problem hiding this comment.
⚪ Nit — The message string is a bit long inline. Consider extracting it as a constant for readability:
const SHUTDOWN_MSG: &str =
"🔄 Broker restarting. You can continue the conversation when the broker is back.";Minor — skip if you prefer it inline.
OpenAB PR Screening
Screening report## IntentThis PR tries to make broker shutdowns visible to users who are actively chatting in Discord threads. Today, a broker restart or stop can terminate an in-flight conversation without explanation, leaving a partial reply or a dead thread with no operator-visible or user-visible signal. The proposed change addresses that gap by sending a shutdown notice to all currently active Discord threads before the broker tears down the session pool. FeatThis is a feature with some operational-hardening characteristics. In plain terms, when the OpenAB broker receives Who It ServesThe primary beneficiary is Discord end users in active threads, with secondary value for maintainers and operators who want shutdown behavior to be less opaque and easier to reason about during deploys, restarts, or crashes followed by restart. Rewritten PromptImplement Phase 1 of graceful shutdown notification for the Discord broker. When the process receives either Merge PitchThis is worth advancing because it closes a user-facing failure mode with low implementation surface area: silent thread death during broker restarts. The change is operationally useful, narrow in scope, and aligned with RFC #78 Phase 1. The risk profile is moderate-low. The likely reviewer concern is not whether the feature is useful, but whether shutdown-path messaging is too coupled to Discord delivery, too optimistic under rate limits, or under-tested for edge cases like invalid thread IDs and partial shutdown failures. Best-Practice ComparisonOpenClaw principles that are relevant here:
Hermes Agent principles that are relevant here:
Net assessment:
Implementation OptionsOption 1: Minimal signal-path broadcastKeep the current approach: handle This is the most conservative path. It preserves existing boundaries reasonably well and keeps the PR tightly scoped to user notification. Option 2: Lifecycle notifier abstractionIntroduce a small shutdown notifier component or trait owned by the broker layer. The pool exposes active session identifiers, while the broker invokes a notifier responsible for formatting and delivering shutdown notices. The initial implementation still targets Discord threads, but the shutdown path becomes easier to test and less ad hoc. This is the balanced option. It adds a small abstraction without dragging Discord concerns into Option 3: Broader graceful-shutdown frameworkImplement a more complete shutdown coordinator: optional drain window for in-flight streams, bounded concurrency for thread notifications, structured shutdown result logging, and groundwork for Phase 2 persistence or resumable session metadata. This is the ambitious option. It better matches mature gateway/runtime patterns, but it expands scope and risks turning a clean Phase 1 merge into a broader design exercise. Comparison Table
RecommendationRecommend the balanced version of Option 1 leaning slightly toward Option 2: merge the shutdown notification behavior now, but keep the design disciplined enough that notification delivery is clearly broker-owned and easy to evolve. That is the right step for follow-up discussion because it solves the real user problem immediately without pretending to solve persistence or resumability. If reviewers are comfortable with the current narrow accessor plus |
82c8a7e to
fa106cd
Compare
|
Force-pushed Mapping to the screening concerns:
Design alternatives considered (before picking pool-owned addressing):
Tests: 43 passed, clippy clean. CI green. |
fa106cd to
12aa620
Compare
|
Force-pushed
Tests: 43 passed, clippy clean. CI green. |
|
small maintainer note: the earlier CI failure was not a design objection to the shutdown feature itself. the real issue was branch drift against current |
|
handoff note: the earlier CI failures were merge-drift noise, not a feature-design rejection. we rebased/fixed the branch against current |
masami-agent
left a comment
There was a problem hiding this comment.
PR Review: #182
Summary
- Problem: When the broker shuts down (SIGINT/SIGTERM), active conversations die silently mid-reply with no user-facing signal. RFC #78 §1d Phase 1.
- Approach: Pool-owned addressing + atomic shutdown flag + parallel broadcast via
JoinSetwith timeout, platform-neutral through theChatAdapterabstraction. - Risk level: Medium — touches core pool admission logic and shutdown path.
Core Assessment
- Problem clearly stated: ✅ — Excellent PR description with design rationale, behavior notes, and test plan.
- Approach appropriate: ✅ — Pool-owns-addressing is the right call. No parallel cache in the adapter layer, single source of truth.
- Alternatives considered: ✅ — PR description explicitly contrasts with the original Discord-specific approach and explains why the multi-platform refactor makes this the right home.
- Best approach for now: ✅ — Minimal, well-scoped. Deliberately avoids session persistence (Phase 2 follow-up).
Findings
Shutdown admission invariant (pool.rs) — The 4-point check in get_or_create is thorough and correct:
- Fast-fail on entry (cheap, avoids spawning ACP process).
- Inside
state.read()block (catches shutdowns before this task ran). - After
conn.lock()withstate.read()barrier (synchronizes withbegin_shutdown's write-lock flag flip — this is the subtle one, and the comment explains it well). - Inside final
state.write()before insert (atomic with flag-flip + snapshot).
The begin_shutdown() method correctly holds the state write lock while flipping the AtomicBool and snapshotting addresses, so any admission that committed before the lock is in the snapshot, and any that comes after sees the flag. Sound.
addresses invariant — addresses.keys() == active.keys() is maintained across all mutation paths: insert, stale rebuild, eviction, cleanup_idle, and shutdown. I traced each one — all correct.
Broadcast timeout (adapter.rs) — The biased select with set.shutdown().await is correct. set.shutdown() cancels remaining tasks and waits for them to drop, which is the right behavior for HTTP calls that should be cancellable via tokio's cooperative cancellation.
Bot-loop prevention (adapter.rs) — Smart catch. During the shutdown window, the bot's own broadcast message could arrive as a new event on Slack, trigger handle_message, get rejected by the pool, and the rejection reply would loop. Deserializing sender_json to check is_bot breaks the cycle cleanly. The Deserialize derive on SenderContext is the minimal change needed.
Signal handling (main.rs) — wait_for_shutdown_signal() with #[cfg(unix)] SIGTERM + ctrl_c, and a ctrl_c-only fallback for non-Unix, is clean and correct. Both the Discord and Slack-only branches use the same flow.
discord.rs / slack.rs — Purely rustfmt line wrapping. No logic changes. ✅
Review Summary
🔴 Blockers
(none)
💬 Questions
get_or_createsignature change — The newchannel: &ChannelRef, adapter: &Arc<dyn ChatAdapter>parameters are passed through fromhandle_message. Is there any other call site forget_or_createoutside ofhandle_message? If so, those callers would need updating. (I only see the one call site inadapter.rs, so this should be fine — just confirming.)
🔧 Suggested Changes
- Commit history cleanup — The branch has merge commits from "Codex Temp" (
48034166,9e7deaa9,13563f9c). Since the project uses squash merge, this will be cleaned up at merge time, but if you're planning any further pushes to this branch, consider rebasing to a single commit for cleanergit logduring review.
ℹ️ Info
- The
_sync = self.state.read().awaitpattern in check #3 is unusual (acquiring a read lock purely for synchronization), but the comment explains the intent clearly. The RwLock acquire provides the happens-before relationship needed to make theAtomicBoolstore frombegin_shutdownvisible. This is a valid and well-documented pattern here. broadcast_shutdownsends viaadapter.send_message()(Discord/Slack HTTP API), not through ACP connections, sopool.shutdown()clearingactiveconnections afterward does not interfere with in-flight broadcast sends. The sequencing inmain.rs(broadcast → shutdown shards → pool.shutdown()) is correct.- The
AtomicBoolwith Acquire/Release ordering is technically redundant with the RwLock-protected checks in points #2 and #4 (which already have happens-before from the lock), but it's needed for the fast-fail path (#1) and the read-barrier path (#3). Belt-and-suspenders for shutdown correctness is the right trade-off.
⚪ Nits
(none — code is clean, comments are thorough)
Verdict
APPROVE — This is a well-designed, well-documented feature. The shutdown admission invariant is sound, the race condition coverage is thorough, and the platform-neutral broadcast approach is the right architecture. No blockers.
obrutjack
left a comment
There was a problem hiding this comment.
Merge checklist verified:
- ✅ CI all green (cargo check + 7 Docker smoke tests)
- ✅ masami-agent reviewed — no blockers, thorough analysis of shutdown admission invariant
- ✅ Multi-platform broadcast via ChatAdapter (Discord + Slack)
- ✅ Parallel broadcast with 10s timeout (JoinSet + deadline)
- ✅ SIGTERM + SIGINT handling (unix + non-unix)
- ✅ Bot-loop prevention during shutdown window
- ✅ Pool admission 4-layer shutdown check — sound
- ✅ Neutral wording (no "restarting" assumption)
- ✅ No version regression
Well-designed feature. The pool-owned addressing approach is the right architecture. Pending @thepagent.
…penabdev#78 §1d Phase 1) On SIGINT or SIGTERM the broker now posts a short notification to every active thread across all configured adapters (Discord, Slack, …) before closing shards. Users get a clear signal that the broker is going away instead of replies cutting off silently mid-stream. Discord Discussion URL: https://discord.com/channels/1488041051187974246/1495050997461024879 - `SessionPool` stores each active session's addressing info alongside its connection (`addresses: HashMap<thread_key, (ChannelRef, Adapter)>`), kept in lockstep with `state.active` so broadcast has a single source of truth. No parallel cache in the adapter layer. - `SessionPool::begin_shutdown()` flips the shutdown flag and snapshots `addresses` under the state write lock, making admission and snapshot atomic with respect to each other. - `AdapterRouter::broadcast_shutdown(message, timeout)` calls `begin_shutdown`, then posts the notification to every snapshot entry in parallel via `tokio::task::JoinSet`. A `tokio::select!` with a 10-second deadline caps total broadcast time so shutdown itself never blocks on a slow platform. - `main.rs` shutdown handler listens for SIGTERM in addition to SIGINT (`#[cfg(unix)]`-gated with a ctrl_c-only fallback for non-Unix), and runs `broadcast_shutdown` before `shard_manager.shutdown_all()`. Both the Discord branch and the Slack-only branch use the same flow via a shared `wait_for_shutdown_signal()` helper. Every session in the pool at broadcast time is either: 1. In the broadcast snapshot (receives the notification), or 2. Rejected at admission (receives an inline "Bot is shutting down" notice through `handle_message`'s pool-error path). `get_or_create` checks `is_shutting_down()` at four atomic points: 1. Fast-fail on entry (cheap). 2. Inside the initial `state.read()` block (catches shutdowns that ran before we reached this task). 3. Before the `return Ok(())` on the existing-alive-session path, with a `state.read()` barrier to synchronize with `begin_shutdown`'s `state.write()` flag flip (catches shutdowns that started while we were waiting on the per-connection mutex). 4. Inside the final `state.write()` block before insert (atomic with `begin_shutdown`'s flag flip + snapshot). - Notification is best-effort: delivery errors and the 10-second deadline both fall through to normal pool teardown. - Wording is intentionally neutral ("Context will reset on return") — we don't promise automatic session resume; RFC openabdev#78 Phase 2 persistence is a separate follow-up. - Per-channel parallel send is safe under Discord/Slack rate limits (limits are per-channel, not per-adapter) and is ~10× faster than sequential for deployments with several active threads. - `cargo test` passes: 43 passed, 0 failed. - `cargo clippy --all-targets -- -D warnings` clean. - Verified on bare-metal and Docker brokers: SIGTERM triggers `broadcasting shutdown notification count=N` log and the message arrives in every active thread. Relates to: openabdev#78, openabdev#75
|
Rebased onto current Summary of conflict resolution:
Tests: New head SHAs:
Thanks for your patience with the rebase churn — happy to adjust further if reviewers want a different shape. |
13563f9 to
44a9cdd
Compare
四法師 Collaborative Triage Review — PR #182 (2026-05-01)Verdict: CHANGES_REQUESTED — Design is sound and well-documented. Blocking on rebase against current 🟢 INFO — What looks goodAll four monks agree the core design is correct:
🟡 NIT — Non-blocking suggestions
🔴 SUGGESTED CHANGES — Blocking1. Rebase Required (all four monks)PR merge state is 2. Rebase Risk: Session Persistence Regression (擺渡)Critical finding: Current If the rebase naively takes the PR version of
3. Rebase Risk: main.rs Feature Regression (擺渡)Current 📋 Review Participants
Next step for contributor @ruan330: Please rebase onto current |
Follow-up: Precise Rebase Guidance (四法師 supplemental)普渡法師 verified current
Rebase checklist for @ruan330
Once rebased and |
chaodu-agent
left a comment
There was a problem hiding this comment.
PR Review: #182 — Broadcast Shutdown Notification
CHANGES REQUESTED — Solid design with atomic admission + snapshot, but the PR has merge conflicts (mergeStateStatus: DIRTY) that must be resolved before merge. The main branch has evolved significantly since this PR was opened.
Baseline Check (Step 0)
Main branch already has:
shutdown_signal()inmain.rs— handles SIGINT + SIGTERM with#[cfg(unix)]gatingpool.shutdown()— clears active sessions on exitrouterisArc<AdapterRouter>, already cloned across Discord/Slack/Gateway/Cron tasksget_or_create(&self, thread_id: &str)— single-arg signaturePoolStateusesactive: HashMap<String, Arc<Mutex<AcpConnection>>>(flat map)
Main does NOT have:
- Any broadcast notification to active threads on shutdown
SessionEntrystruct orChannelRef/ChatAdapterstored in poolbegin_shutdown()/is_shutting_down()onSessionPoolbroadcast_shutdown()onAdapterRouter
Verdict: Feature is entirely net-new. No stale/duplicate content.
Four Questions
1. What problem does it solve?
When the broker shuts down (SIGINT/SIGTERM, docker stop, helm uninstall), active conversations die silently. Users see replies cut off mid-stream with no explanation. RFC #78 Phase 1d calls this out.
2. How does it solve it?
- Pool owns addressing:
SessionPoolstoresChannelRef+Arc<dyn ChatAdapter>alongside each connection in a newSessionEntrystruct, keeping them in lockstep withsessions(renamed fromactive). - Atomic admission + snapshot:
begin_shutdown()takes the state write lock, flips anAtomicBoolflag, and snapshots all session addresses in one critical section. - Four-point admission guard:
get_or_createchecksis_shutting_down()at entry, insidestate.read(), after the per-connection mutex (with a read-lock barrier), and inside the finalstate.write()before insert. - Platform-neutral broadcast:
AdapterRouter::broadcast_shutdown()sends to all snapshot entries in parallel viaJoinSet, capped by a 10s timeout. - Unified signal handling:
wait_for_shutdown_signal()handles both SIGINT and SIGTERM on Unix.
3. What was considered?
- This is a rewrite of the original Discord-specific approach against the multi-platform
AdapterRouter/ChatAdapterabstraction from #259. - Wording is intentionally neutral ("Context will reset on return") — no promise of session resume (that is RFC #78 Phase 2).
- Best-effort delivery: send errors and the 10s deadline both fall through to normal teardown.
4. Is this the best approach?
Strengths:
- The atomic admission + snapshot design is well-reasoned. The four-point shutdown check in
get_or_createis thorough and each check point is justified in comments. - Pool-owns-addressing is cleaner than a parallel cache in the adapter layer.
JoinSet+tokio::select!with deadline is the right pattern for bounded parallel broadcast.- The bot-loop prevention (skipping shutdown rejection for bot-authored events) is a nice edge case catch.
Concerns listed in Traffic Light below.
Traffic Light Summary
- 🔴 MERGE CONFLICTS — PR is in DIRTY merge state. Main has evolved (e.g.,
shutdown_signal()already exists,PoolStatestructure may have changed). Contributor must rebase onto current main and resolve conflicts. - 🔴 Verify
is_botfield — The shutdown-rejection path deserializesSenderContextand reads.is_bot. Confirm this field exists in the currentSenderContextstruct on main. If not, it needs to be added or the deserialization will silently default. - 🟡 Consolidate signal handlers — Main already has
shutdown_signal(). The PR addswait_for_shutdown_signal()with identical logic. After rebase, consolidate to one. - 🟡
_shutdown_barrierfragility — Consider extracting the read-lock-as-barrier pattern into a named helper method to prevent accidental removal. - 🟢 Atomic admission design — The four-point shutdown check with justified comments at each point is excellent defensive programming.
- 🟢 Bot-loop prevention — Smart edge case handling for Slack where the bot's own broadcast could trigger a re-entry.
- 🟢 Timeout-bounded broadcast —
JoinSet+tokio::select!with 10s deadline ensures shutdown is never blocked by a slow platform.
CI is all green (cargo check + 7 Docker smoke tests pass), but the merge conflicts must be resolved first. Previous reviews: masami-agent commented (no blockers), shaun-agent approved, obrutjack's approval was dismissed (likely due to new commits).
Action needed: Contributor @ruan330 please rebase onto current main and resolve conflicts. The shutdown_signal() function already exists on main — consolidate with your wait_for_shutdown_signal() during rebase.
|
Not planned for now. |
Summary
On SIGINT or SIGTERM the broker now posts a short notification to every active thread across all configured adapters (Discord, Slack, …) before closing shards. Users get a clear signal that the broker is going away instead of replies cutting off silently mid-stream.
Discord Discussion URL: https://discord.com/channels/1488041051187974246/1495050997461024879
Relates to: #78, #75
Problem
Today, when the broker is restarted (
systemctl restart,docker stop, Ctrl+C), conversations just die silently mid-reply. Users have no signal that anything is happening until the broker comes back, and in-flight streams leave partial messages with no explanation. RFC #78 §1d calls this out as Phase 1.The original #182 (rebased before v0.7.6) was Discord-specific and done at the
main.rslevel. The multi-platform refactor in #259 makes that approach stale: Slack threads also need the notification, and theAdapterRouter/ChatAdapterabstraction is the right home for platform-neutral broadcast. This revision rewrites the feature against that architecture.Design
Pool owns addressing, not the router.
SessionPoolnow stores each active session'sChannelRef+Arc<dyn ChatAdapter>alongside the connection (addresses: HashMap<thread_key, ...>), kept in lockstep withstate.activeso broadcast has a single source of truth. No parallel cache in the adapter layer, nopool.active_thread_ids()filter, no prune task.Atomic admission and snapshot.
SessionPool::begin_shutdown()takes the state write lock, flips the shutdown flag, and snapshotsaddresses— all in one critical section. Any admission committed before us is in the snapshot.get_or_createchecksis_shutting_down()at four points so every session is either in the snapshot or rejected at admission:state.read()block.return Ok(())on the existing-alive-session path, with astate.read()barrier synchronizing againstbegin_shutdown'sstate.write()flag flip (closes the race where shutdown starts while we wait on the per-connection mutex).state.write()block before insert.Platform-neutral broadcast.
AdapterRouter::broadcast_shutdown(message, timeout)callspool.begin_shutdown()and posts the notification to every snapshot entry in parallel viatokio::task::JoinSet. Atokio::select!with a configurable deadline (10s here) caps total broadcast time so shutdown itself never blocks on a slow platform.Unified signal handling.
main.rsuses a sharedwait_for_shutdown_signal()helper for SIGINT + SIGTERM (#[cfg(unix)]-gated with a ctrl_c-only fallback for non-Unix). Both the Discord-enabled and Slack-only branches invokebroadcast_shutdownbefore tearing down adapters.Behaviour notes
Tests & verification
cargo test— 43 passed, 0 failed.cargo clippy --all-targets -- -D warnings— clean.broadcasting shutdown notification count=Nlog and the message arrives in every active thread.Test plan
cargo test,cargo clippy, Docker smoke tests.docker stop/kill -TERM), verify the thread receives the "Bot is shutting down" message before the pool tears down.